package com.jiuyv.flink.streaming.core.logs;

import com.jiuyv.flink.streaming.common.enums.SqlCommand;
import com.jiuyv.flink.streaming.common.model.SqlCommandCall;

import org.apache.flink.table.api.TableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Description: TODO(这里用一句话描述这个类的作用) 
 * @author  shu_k
 * @date 2021年3月31日 下午6:18:15
 */
public class LogPrint {
	
	private static final Logger log = LoggerFactory.getLogger(LogPrint.class);

    /**
     * 打印SqlCommandCall 日志信息
     *
     * @date 2021/3/21
     * @time 11:25
     */
    public static void logPrint(SqlCommandCall sqlCommandCall) {
        if (sqlCommandCall == null) {
            throw new NullPointerException("sqlCommandCall is null");
        }
        System.out.println("\n #############" + sqlCommandCall.sqlCommand.name() + "############# \n"
                + sqlCommandCall.operands[0]);
        log.info("\n #############{}############# \n {}", sqlCommandCall.sqlCommand.name(),
                sqlCommandCall.operands[0]);
    }

    /**
     * show 语句  select语句结果打印
     *
     * @author zhuhuipei
     * @date 2021/3/21
     * @time 11:23
     */
    public static void queryRestPrint(TableEnvironment tEnv, SqlCommandCall sqlCommandCall) {
        if (sqlCommandCall == null) {
            throw new NullPointerException("sqlCommandCall is null");
        }
        LogPrint.logPrint(sqlCommandCall);


        if (sqlCommandCall.getSqlCommand().name().equalsIgnoreCase(SqlCommand.SELECT.name())) {
            throw new RuntimeException("目前不支持select 语法使用");
        } else {
            tEnv.executeSql(sqlCommandCall.operands[0]).print();
        }

		/*
		 * if (sqlCommandCall.getSqlCommand().name().equalsIgnoreCase(SqlCommand.SELECT.
		 * name())) { Iterator<Row> it =
		 * tEnv.executeSql(sqlCommandCall.operands[0]).collect(); while (it.hasNext()) {
		 * String res = String.join(",", PrintUtils.rowToString(it.next()));
		 * log.info("数据结果 {}", res); } }
		 */
    }

}
